Node.js Worker Threads Module

Node.js -

কর্মী থ্রেড কি?

Worker Threads হল Node.js-এ প্রবর্তিত একটি বৈশিষ্ট্য (প্রথম v10.5.0-এ একটি পরীক্ষামূলক বৈশিষ্ট্য হিসেবে এবং v12-এ স্থিতিশীল) যা জাভাস্ক্রিপ্ট কোডকে একাধিক CPU কোর জুড়ে সমান্তরালভাবে চলতে দেয়।

চাইল্ড_প্রসেস বা ক্লাস্টার মডিউলের বিপরীতে, যা আলাদা Node.js প্রসেস তৈরি করে, ওয়ার্কার থ্রেড মেমরি শেয়ার করতে পারে এবং জাভাস্ক্রিপ্ট কোডকে সত্যিকারের সমান্তরালে চালাতে পারে।

Node.js Worker Threads CPU- Node.js - .

Node.js I/O- , CPU- .

💡দ্রষ্টব্য:

কর্মী থ্রেডগুলি ব্রাউজারে ওয়েব ওয়ার্কারদের থেকে আলাদা, যদিও তারা একই ধারণাগুলি ভাগ করে। Node.js ওয়ার্কার থ্রেড বিশেষভাবে Node.js রানটাইম পরিবেশের জন্য ডিজাইন করা হয়েছে।

কর্মী থ্রেড কখন ব্যবহার করবেন

কর্মী থ্রেড খুব দরকারী:

CPU- নিবিড় অপারেশন (বড় গণনা, ডেটা প্রক্রিয়াকরণ)
ডেটা সহ-প্রক্রিয়াকরণ
ফাংশন যা মূল থ্রেড ব্লক করতে পারে

এগুলোর প্রয়োজন নেই:

I/O-বাউন্ড অপারেশন (ফাইল সিস্টেম, নেটওয়ার্ক)
যে ফাংশনগুলি ইতিমধ্যেই অ্যাসিঙ্ক্রোনাস API ব্যবহার করে
সহজ কাজ যা দ্রুত সম্পন্ন করা যায়

ওয়ার্কার থ্রেড মডিউল আমদানি করা হচ্ছে

ওয়ার্কার থ্রেড মডিউলটি ডিফল্টরূপে Node.js-এ অন্তর্ভুক্ত থাকে। আপনি প্রয়োজন করে আপনার স্ক্রিপ্টে এটি ব্যবহার করতে পারেন:

const {
  Worker,
  isMainThread,
  parentPort,
  workerData
} = require('worker_threads');

মূল উপাদান

বল ব্যাখ্যা
Worker নতুন কর্মী থ্রেড তৈরির জন্য একটি ক্লাস
isMainThread একটি বুলিয়ান যা সত্য যদি কোডটি মূল থ্রেডে চলছে এবং যদি এটি কোনও শ্রমিকের উপর চলছে তাহলে মিথ্যা
parentPort যদি এই থ্রেডটি একজন কর্মী হয় তবে এটি একটি মেসেজপোর্ট যা মূল থ্রেডের সাথে যোগাযোগের অনুমতি দেয়
workerData কর্মী থ্রেড তৈরি করার সময় ডেটা পাস হয়েছে
MessageChannel একটি যোগাযোগ চ্যানেল তৈরি করে (লিঙ্কযুক্ত মেসেজপোর্ট অবজেক্টের একটি জোড়া)
MessagePort থ্রেডের মধ্যে বার্তা প্রেরণের জন্য একটি ইন্টারফেস
threadId বর্তমান থ্রেডের জন্য একটি অনন্য শনাক্তকারী

আপনার প্রথম কর্মী থ্রেড তৈরি করা হচ্ছে

আসুন একটি সাধারণ উদাহরণ তৈরি করি যেখানে প্রধান থ্রেড একটি সিপিইউ-নিবিড় কাজ সম্পাদন করার জন্য একজন কর্মী তৈরি করে:

// main.js
const { Worker } = require('worker_threads');

// Function to create a new worker
function runWorker(workerData) {
  return new Promise((resolve, reject) => {
    // Create a new worker
    const worker = new Worker('./worker.js', { workerData });
    
    // Listen for messages from the worker
    worker.on('message', resolve);
    
    // Listen for errors
    worker.on('error', reject);
    
    // Listen for worker exit
    worker.on('exit', (code) => {
      if (code !== 0) {
        reject(new Error(`Worker stopped with exit code ${code}`));
      }
    });
  });
}

// Run the worker
async function run() {
  try {
    // Send data to the worker and get the result
    const result = await runWorker('Hello from main thread!');
    console.log('Worker result:', result);
  } catch (err) {
    console.error('Worker error:', err);
  }
}

run().catch(err => console.error(err));
// worker.js
const { parentPort, workerData } = require('worker_threads');

// Receive message from the main thread
console.log('Worker received:', workerData);

// Simulate CPU-intensive task
function performCPUIntensiveTask() {
  // Simple example: Sum up to a large number
  let result = 0;
  for (let i = 0; i < 1_000_000; i++) {
    result += i;
  }
  return result;
}

// Perform the task
const result = performCPUIntensiveTask();

// Send the result back to the main thread
parentPort.postMessage({
  receivedData: workerData,
  calculatedSum: result
});

এই উদাহরণে:

মূল থ্রেড কিছু প্রাথমিক তথ্য সহ একজন কর্মী তৈরি করে
কর্মী CPU-নিবিড় গণনা সঞ্চালন করে
কর্মী ফলাফলটি মূল থ্রেডে পাঠায়
প্রধান থ্রেড ফলাফল গ্রহণ করে এবং প্রক্রিয়া করে

উদাহরণে মূল ধারণা

ওয়ার্কার কনস্ট্রাক্টর ওয়ার্কার স্ক্রিপ্ট এবং অপশন অবজেক্টের পথ নেয়
workerData বিকল্পটি কর্মীকে প্রাথমিক তথ্য প্রেরণ করতে ব্যবহৃত হয়
কর্মী প্রধান থ্রেডের সাথে parentPort.postMessage() ব্যবহার করে যোগাযোগ করে
ইভেন্ট হ্যান্ডলার (বার্তা, ত্রুটি, প্রস্থান) কর্মীর জীবনচক্র পরিচালনা করতে ব্যবহৃত হয়

পাঠ্যের মধ্যে যোগাযোগ

কর্মী থ্রেড বার্তা পাস করে যোগাযোগ করে।

যোগাযোগ দ্বি-দিকনির্দেশক, যার অর্থ প্রধান থ্রেড এবং কর্মীরা উভয়ই বার্তা পাঠাতে এবং গ্রহণ করতে পারে।

মূল সুতো থেকে কর্মী পর্যন্ত

// main.js
const { Worker } = require('worker_threads');

// Create a worker
const worker = new Worker('./message_worker.js');

// Send messages to the worker
worker.postMessage('Hello worker!');
worker.postMessage({ type: 'task', data: [1, 2, 3, 4, 5] });

// Receive messages from the worker
worker.on('message', (message) => {
  console.log('Main thread received:', message);
});

// Handle worker completion
worker.on('exit', (code) => {
  console.log(`Worker exited with code ${code}`);
});
// message_worker.js
const { parentPort } = require('worker_threads');

// Receive messages from the main thread
parentPort.on('message', (message) => {
  console.log('Worker received:', message);
  
  // Process different message types
  if (typeof message === 'object' && message.type === 'task') {
    const result = processTask(message.data);
    parentPort.postMessage({ type: 'result', data: result });
  } else {
    // Echo the message back
    parentPort.postMessage(`Worker echoing: ${message}`);
  }
});

// Example task processor
function processTask(data) {
  if (Array.isArray(data)) {
    return data.map(x => x * 2);
  }
  return null;
}

💬দ্রষ্টব্য:

থ্রেডগুলির মধ্যে প্রেরিত বার্তাগুলি মান দ্বারা অনুলিপি করা হয় (চলবে), রেফারেন্স দ্বারা ভাগ করা হয় না।

এর মানে হল যে আপনি যখন একটি থ্রেড থেকে অন্য থ্রেডে একটি বস্তু পাস করেন, তখন একটি থ্রেডের বস্তুর পরিবর্তন অন্য থ্রেডের অনুলিপিকে প্রভাবিত করবে না।

CPU- নিবিড় টাস্ক উদাহরণ

এখানে একটি খুব বাস্তব উদাহরণ যা CPU- আবদ্ধ কাজগুলির জন্য কর্মী থ্রেড ব্যবহার করার সুবিধা প্রদর্শন করে:

// fibonacci.js
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');

// Recursive Fibonacci function (deliberately inefficient to simulate CPU load)
function fibonacci(n) {
  if (n <= 1) return n;
  return fibonacci(n - 1) + fibonacci(n - 2);
}

if (isMainThread) {
  // This code runs in the main thread
  
  // Function to run a worker
  function runFibonacciWorker(n) {
    return new Promise((resolve, reject) => {
      const worker = new Worker(__filename, { workerData: n });
      worker.on('message', resolve);
      worker.on('error', reject);
      worker.on('exit', (code) => {
        if (code !== 0) {
          reject(new Error(`Worker stopped with exit code ${code}`));
        }
      });
    });
  }
  
  // Measure execution time with and without workers
  async function run() {
    const numbers = [40, 41, 42, 43];
    
    // Using a single thread (blocking)
    console.time('Single thread');
    for (const n of numbers) {
      console.log(`Fibonacci(${n}) = ${fibonacci(n)}`);
    }
    console.timeEnd('Single thread');
    
    // Using worker threads (parallel)
    console.time('Worker threads');
    const results = await Promise.all(
      numbers.map(n => runFibonacciWorker(n))
    );
    for (let i = 0; i < numbers.length; i++) {
      console.log(`Fibonacci(${numbers[i]}) = ${results[i]}`);
    }
    console.timeEnd('Worker threads');
  }
  
  run().catch(err => console.error(err));
} else {
  // This code runs in worker threads
  
  // Calculate Fibonacci number
  const result = fibonacci(workerData);
  
  // Send the result back to the main thread
  parentPort.postMessage(result);
}

এই উদাহরণটি একটি একক-থ্রেড পদ্ধতি এবং কর্মী থ্রেডগুলির সাথে একটি বহু-থ্রেড পদ্ধতি উভয় ব্যবহার করে ফিবোনাচি সংখ্যা গণনা করে।

মাল্টি-কোর সিপিইউতে, ওয়ার্কার থ্রেড সংস্করণটি উল্লেখযোগ্যভাবে দ্রুত হওয়া উচিত কারণ একাধিক সিপিইউ কোর সমান্তরালে ফিবোনাচি সংখ্যা গণনা করতে ব্যবহার করা যেতে পারে।

⚠️সতর্কতা:

যদিও কর্মী থ্রেডগুলি CPU- আবদ্ধ কাজগুলির জন্য কার্যকারিতা উল্লেখযোগ্যভাবে উন্নত করতে পারে, তবে তাদের তৈরি এবং যোগাযোগের জন্য ওভারহেড রয়েছে। খুব ছোট কাজের জন্য, এই ওভারভিউ সুবিধাগুলিকে ছাড়িয়ে যেতে পারে।

কর্মী থ্রেডের সাথে ডেটা ভাগ করা

থ্রেডগুলির মধ্যে ডেটা ভাগ করার বিভিন্ন উপায় রয়েছে:

কপি পাঠানো হচ্ছে:PostMessage() ব্যবহার করার সময় ডিফল্ট আচরণ
মালিকানা হস্তান্তর:PostMessage() এর ট্রান্সফারলিস্ট প্যারামিটার ব্যবহার করে
মেমরি শেয়ার করা:SharedArrayBuffer ব্যবহার করে

ArrayBuffers পরিবর্তন

আপনি যখন একটি ArrayBuffer সংশোধন করেন, তখন আপনি ডেটা অনুলিপি না করে বাফারের মালিকানা এক থ্রেড থেকে অন্য থ্রেডে স্থানান্তর করেন। এটি বড় ডেটার জন্য খুব কার্যকর:

// transfer_main.js
const { Worker } = require('worker_threads');

// Create a large buffer
const buffer = new ArrayBuffer(100 * 1024 * 1024); // 100MB
const view = new Uint8Array(buffer);

// Fill with data
for (let i = 0; i < view.length; i++) {
  view[i] = i % 256;
}

console.log('Buffer created in main thread');
console.log('Buffer byteLength before transfer:', buffer.byteLength);

// Create a worker and transfer the buffer
const worker = new Worker('./transfer_worker.js');
worker.on('message', (message) => {
  console.log('Message from worker:', message);
  
  // After transfer, the buffer is no longer usable in main thread
  console.log('Buffer byteLength after transfer:', buffer.byteLength);
});

// Transfer ownership of the buffer to the worker
worker.postMessage({ buffer }, [buffer]);
// transfer_worker.js
const { parentPort } = require('worker_threads');

parentPort.on('message', ({ buffer }) => {
  const view = new Uint8Array(buffer);
  
  // Calculate sum to verify data
  let sum = 0;
  for (let i = 0; i < view.length; i++) {
    sum += view[i];
  }
  
  console.log('Buffer received in worker');
  console.log('Buffer byteLength in worker:', buffer.byteLength);
  console.log('Sum of all values:', sum);
  
  // Send confirmation back
  parentPort.postMessage('Buffer processed successfully');
});

🔄দ্রষ্টব্য:

একটি ArrayBuffer পরিবর্তন করার পরে, আসল বাফারটি অব্যবহারযোগ্য হয়ে যায় (এর বাইটলেংথ 0 হয়ে যায়)।

প্রাপ্ত থ্রেড বাফারে সম্পূর্ণ অ্যাক্সেস পায়।

SharedArrayBuffer এর সাথে মেমরি শেয়ার করা

এমন পরিস্থিতিতে যেখানে কপি বা অদলবদল ছাড়াই থ্রেডগুলির মধ্যে ডেটা ভাগ করা প্রয়োজন, SharedArrayBuffer একাধিক থ্রেড থেকে একই মেমরি অ্যাক্সেস করার একটি উপায় প্রদান করে।

🔒সতর্কতা:

Specter দুর্বলতা সম্পর্কিত নিরাপত্তা বিবেচনার কারণে কিছু Node.js সংস্করণে SharedArrayBuffer অক্ষম করা হতে পারে। আপনার Node.js সংস্করণের ডকুমেন্টেশন পরীক্ষা করে দেখুন কিভাবে প্রয়োজন হলে এটি সক্ষম করবেন।

// shared_main.js
const { Worker } = require('worker_threads');

// Create a shared buffer
const sharedBuffer = new SharedArrayBuffer(4 * 10); // 10 Int32 values
const sharedArray = new Int32Array(sharedBuffer);

// Initialize the shared array
for (let i = 0; i < sharedArray.length; i++) {
  sharedArray[i] = i;
}

console.log('Initial shared array in main thread:', [...sharedArray]);

// Create a worker that will update the shared memory
const worker = new Worker('./shared_worker.js', {
  workerData: { sharedBuffer }
});

worker.on('message', (message) => {
  console.log('Message from worker:', message);
  console.log('Updated shared array in main thread:', [...sharedArray]);
  
  // The changes made in the worker are visible here
  // because we're accessing the same memory
});
// shared_worker.js
const { parentPort, workerData } = require('worker_threads');
const { sharedBuffer } = workerData;

// Create a new view on the shared buffer
const sharedArray = new Int32Array(sharedBuffer);

console.log('Initial shared array in worker:', [...sharedArray]);

// Modify the shared memory
for (let i = 0; i < sharedArray.length; i++) {
  // Double each value
  sharedArray[i] = sharedArray[i] * 2;
}

console.log('Updated shared array in worker:', [...sharedArray]);

// Notify the main thread
parentPort.postMessage('Shared memory updated');

পারমাণবিকের সাথে অ্যাক্সেস সিঙ্ক্রোনাইজ করা

যখন একাধিক থ্রেড শেয়ার করা মেমরি অ্যাক্সেস করে, তখন রেসের অবস্থা রোধ করতে আপনার অ্যাক্সেস সিঙ্ক্রোনাইজ করার একটি উপায় প্রয়োজন।

অ্যাটমিক্স অবজেক্ট শেয়ার্ড মেমরি অ্যারেগুলিতে পারমাণবিক ক্রিয়াকলাপের জন্য পদ্ধতি সরবরাহ করে।

// atomics_main.js
const { Worker } = require('worker_threads');

// Create a shared buffer with control flags and data
const sharedBuffer = new SharedArrayBuffer(4 * 10);
const sharedArray = new Int32Array(sharedBuffer);

// Initialize values
sharedArray[0] = 0; // Control flag: 0 = main thread's turn, 1 = worker's turn
sharedArray[1] = 0; // Data value to increment

// Create workers
const workerCount = 4;
const workerIterations = 10;
const workers = [];

console.log(`Creating ${workerCount} workers with ${workerIterations} iterations each`);

for (let i = 0; i < workerCount; i++) {
  const worker = new Worker('./atomics_worker.js', {
    workerData: { sharedBuffer, id: i, iterations: workerIterations }
  });
  
  workers.push(worker);
  
  worker.on('exit', () => {
    console.log(`Worker ${i} exited`);
    
    // If all workers have exited, show final value
    if (workers.every(w => w.threadId === -1)) {
      console.log(`Final value: ${sharedArray[1]}`);
      console.log(`Expected value: ${workerCount * workerIterations}`);
    }
  });
}

// Signal to the first worker to start
Atomics.store(sharedArray, 0, 1);
Atomics.notify(sharedArray, 0);
// atomics_worker.js
const { parentPort, workerData } = require('worker_threads');
const { sharedBuffer, id, iterations } = workerData;

// Create a typed array from the shared memory
const sharedArray = new Int32Array(sharedBuffer);

for (let i = 0; i < iterations; i++) {
  // Wait for this worker's turn
  while (Atomics.load(sharedArray, 0) !== id + 1) {
    // Wait for notification
    Atomics.wait(sharedArray, 0, Atomics.load(sharedArray, 0));
  }
  
  // Increment the shared counter
  const currentValue = Atomics.add(sharedArray, 1, 1);
  console.log(`Worker ${id} incremented counter to ${currentValue + 1}`);
  
  // Signal to the next worker
  const nextWorkerId = (id + 1) % (iterations === 0 ? 1 : iterations);
  Atomics.store(sharedArray, 0, nextWorkerId + 1);
  Atomics.notify(sharedArray, 0);
}

// Exit the worker
parentPort.close();

দ্রষ্টব্য:

অ্যাটমিক্স অবজেক্টটি শেয়ার করা মেমরিতে অ্যাক্সেস সিঙ্ক্রোনাইজ করতে এবং থ্রেডগুলির মধ্যে সমন্বয় প্যাটার্নগুলি বাস্তবায়নের জন্য লোড, সঞ্চয়, যোগ, অপেক্ষা, এবং বিজ্ঞপ্তির মতো পদ্ধতি সরবরাহ করে।

শ্রমিক পুল তৈরি

বেশিরভাগ অ্যাপ্লিকেশনের জন্য, আপনি একই সাথে একাধিক কাজ পরিচালনা করার জন্য একটি কর্মী পুল তৈরি করতে চাইবেন।

এখানে একটি সাধারণ কর্মী পুল বাস্তবায়ন রয়েছে:

// worker_pool.js
const { Worker } = require('worker_threads');
const os = require('os');
const path = require('path');

class WorkerPool {
  constructor(workerScript, numWorkers = os.cpus().length) {
    this.workerScript = workerScript;
    this.numWorkers = numWorkers;
    this.workers = [];
    this.freeWorkers = [];
    this.tasks = [];
    
    // Initialize workers
    this._initialize();
  }
  
  _initialize() {
    // Create all workers
    for (let i = 0; i < this.numWorkers; i++) {
      this._createWorker();
    }
  }
  
  _createWorker() {
    const worker = new Worker(this.workerScript);
    
    worker.on('message', (result) => {
      // Get the current task
      const { resolve } = this.tasks.shift();
      
      // Resolve the task with the result
      resolve(result);
      
      // Add this worker back to the free workers pool
      this.freeWorkers.push(worker);
      
      // Process the next task if any
      this._processQueue();
    });
    
    worker.on('error', (err) => {
      // If a worker errors, terminate it and create a new one
      console.error(`Worker error: ${err}`);
      this._removeWorker(worker);
      this._createWorker();
      
      // Process the next task
      if (this.tasks.length > 0) {
        const { reject } = this.tasks.shift();
        reject(err);
        this._processQueue();
      }
    });
    
    worker.on('exit', (code) => {
      if (code !== 0) {
        console.error(`Worker exited with code ${code}`);
        this._removeWorker(worker);
        this._createWorker();
      }
    });
    
    // Add to free workers
    this.workers.push(worker);
    this.freeWorkers.push(worker);
  }
  
  _removeWorker(worker) {
    // Remove from the workers arrays
    this.workers = this.workers.filter(w => w !== worker);
    this.freeWorkers = this.freeWorkers.filter(w => w !== worker);
  }
  
  _processQueue() {
    // If there are tasks and free workers, process the next task
    if (this.tasks.length > 0 && this.freeWorkers.length > 0) {
      const { taskData } = this.tasks[0];
      const worker = this.freeWorkers.pop();
      worker.postMessage(taskData);
    }
  }
  
  // Run a task on a worker
  runTask(taskData) {
    return new Promise((resolve, reject) => {
      const task = { taskData, resolve, reject };
      this.tasks.push(task);
      this._processQueue();
    });
  }
  
  // Close all workers when done
  close() {
    for (const worker of this.workers) {
      worker.terminate();
    }
  }
}

module.exports = WorkerPool;

শ্রম পুল ব্যবহার করে:

// pool_usage.js
const WorkerPool = require('./worker_pool');
const path = require('path');

// Create a worker pool with the worker script
const pool = new WorkerPool(path.resolve(__dirname, 'pool_worker.js'));

// Function to run tasks on the pool
async function runTasks() {
  const tasks = [
    { type: 'fibonacci', data: 40 },
    { type: 'factorial', data: 15 },
    { type: 'prime', data: 10000000 },
    { type: 'fibonacci', data: 41 },
    { type: 'factorial', data: 16 },
    { type: 'prime', data: 20000000 },
    { type: 'fibonacci', data: 42 },
    { type: 'factorial', data: 17 },
  ];
  
  console.time('All tasks');
  
  try {
    // Run all tasks in parallel
    const results = await Promise.all(
      tasks.map(task => {
        console.time(`Task: ${task.type}(${task.data})`);
        return pool.runTask(task)
          .then(result => {
            console.timeEnd(`Task: ${task.type}(${task.data})`);
            return result;
          });
      })
    );
    
    // Log results
    for (let i = 0; i < tasks.length; i++) {
      console.log(`${tasks[i].type}(${tasks[i].data}) = ${results[i].result}`);
    }
  } catch (err) {
    console.error('Error running tasks:', err);
  } finally {
    console.timeEnd('All tasks');
    pool.close();
  }
}

runTasks().catch(console.error);
// pool_worker.js
const { parentPort } = require('worker_threads');

// Fibonacci function
function fibonacci(n) {
  if (n <= 1) return n;
  return fibonacci(n - 1) + fibonacci(n - 2);
}

// Factorial function
function factorial(n) {
  if (n <= 1) return 1;
  return n * factorial(n - 1);
}

// Prime count function
function countPrimes(max) {
  const sieve = new Uint8Array(max);
  let count = 0;
  
  for (let i = 2; i < max; i++) {
    if (!sieve[i]) {
      count++;
      for (let j = i * 2; j < max; j += i) {
        sieve[j] = 1;
      }
    }
  }
  
  return count;
}

// Handle messages from the main thread
parentPort.on('message', (task) => {
  const { type, data } = task;
  let result;
  
  // Perform different calculations based on task type
  switch (type) {
    case 'fibonacci':
      result = fibonacci(data);
      break;
    case 'factorial':
      result = factorial(data);
      break;
    case 'prime':
      result = countPrimes(data);
      break;
    default:
      throw new Error(`Unknown task type: ${type}`);
  }
  
  // Send the result back
  parentPort.postMessage({ result });
});

🏊দ্রষ্টব্য:

এই শ্রম পুল বাস্তবায়ন টাস্ক শিডিউলিং, শ্রম ত্রুটি এবং স্বয়ংক্রিয় শ্রম প্রতিস্থাপন পরিচালনা করে।

এটি বাস্তব-বিশ্বের অ্যাপ্লিকেশনগুলির জন্য একটি ভাল সূচনা বিন্দু, তবে কর্মী সময়সীমা এবং অগ্রাধিকারযুক্ত কাজগুলির মতো বৈশিষ্ট্যগুলির সাথে প্রসারিত করা যেতে পারে।

ব্যবহারিক প্রয়োগ: চিত্র প্রক্রিয়াকরণ

ইমেজ প্রসেসিং কর্মী থ্রেডের জন্য একটি আদর্শ ব্যবহারের ক্ষেত্রে কারণ এটি CPU- নিবিড় এবং সহজে সমান্তরাল।

এখানে সমান্তরাল চিত্র প্রক্রিয়াকরণের একটি উদাহরণ রয়েছে:

// image_main.js
const { Worker } = require('worker_threads');
const path = require('path');
const fs = require('fs');

// Function to process an image in a worker
function processImageInWorker(imagePath, options) {
  return new Promise((resolve, reject) => {
    const worker = new Worker('./image_worker.js', {
      workerData: {
        imagePath,
        options
      }
    });
    
    worker.on('message', resolve);
    worker.on('error', reject);
    worker.on('exit', (code) => {
      if (code !== 0) {
        reject(new Error(`Worker stopped with exit code ${code}`));
      }
    });
  });
}

// Main function to process multiple images in parallel
async function processImages() {
  const images = [
    { path: 'image1.jpg', options: { grayscale: true } },
    { path: 'image2.jpg', options: { blur: 5 } },
    { path: 'image3.jpg', options: { sharpen: 10 } },
    { path: 'image4.jpg', options: { resize: { width: 800, height: 600 } } }
  ];
  
  console.time('Image processing');
  
  try {
    // Process all images in parallel
    const results = await Promise.all(
      images.map(img => processImageInWorker(img.path, img.options))
    );
    
    console.log('All images processed successfully');
    console.log('Results:', results);
  } catch (err) {
    console.error('Error processing images:', err);
  }
  
  console.timeEnd('Image processing');
}

// Note: This is a conceptual example.
// In a real application, you would use an image processing library like sharp or jimp
// and provide actual image files.
// processImages().catch(console.error);
console.log('Image processing example (not actually running)');
// image_worker.js
const { parentPort, workerData } = require('worker_threads');
const { imagePath, options } = workerData;

// In a real application, you would import an image processing library here
// const sharp = require('sharp');

// Simulate image processing
function processImage(imagePath, options) {
  console.log(`Processing image: ${imagePath} with options:`, options);
  
  // Simulate processing time based on options
  let processingTime = 500; // Base time in ms
  
  if (options.grayscale) processingTime += 200;
  if (options.blur) processingTime += options.blur * 50;
  if (options.sharpen) processingTime += options.sharpen * 30;
  if (options.resize) processingTime += 300;
  
  // Simulate the actual processing
  return new Promise(resolve => {
    setTimeout(() => {
      // Return simulated result
      resolve({
        imagePath,
        outputPath: `processed_${imagePath}`,
        processing: options,
        dimensions: options.resize || { width: 1024, height: 768 },
        size: Math.floor(Math.random() * 1000000) + 500000 // Random file size
      });
    }, processingTime);
  });
}

// Process the image and send the result back
processImage(imagePath, options)
  .then(result => {
    parentPort.postMessage(result);
  })
  .catch(err => {
    throw err;
  });

কর্মী থ্রেড বনাম শিশু প্রক্রিয়া এবং ক্লাস্টার

অন্যান্য Node.js সমবর্তী প্রক্রিয়ার সাথে কর্মী থ্রেডের তুলনা করা গুরুত্বপূর্ণ:

বৈশিষ্ট্য Worker Threads Child Process Cluster
শেয়ার করা মেমরি হ্যাঁ (SharedArrayBuffer এর মাধ্যমে) না (শুধুমাত্র আইপিসি) না (শুধুমাত্র আইপিসি)
সম্পদের ব্যবহার অন্তত (ভাগ করা V8 ইভেন্ট) আরও (পৃথক প্রক্রিয়া) আরও (পৃথক প্রক্রিয়া)
শুরুর সময় দ্রুত ধীর ধীর
আলাদা করা অন্তত (ইভেন্ট লুপ ভাগ করে) আরও (সম্পূর্ণ প্রক্রিয়া বিচ্ছিন্নতা) আরও (সম্পূর্ণ প্রক্রিয়া বিচ্ছিন্নতা)
ব্যর্থতার প্রভাব পিতামাতা থ্রেড প্রভাবিত করতে পারেন শিশু প্রক্রিয়ায় সীমাবদ্ধ শ্রম প্রক্রিয়ায় সীমাবদ্ধ
সেরা CPU- নিবিড় কাজ বিভিন্ন কর্মসূচি চালাচ্ছেন স্কেলিং অ্যাপ্লিকেশন

কর্মী থ্রেড কখন ব্যবহার করবেন

CPU- আবদ্ধ কাজ যেমন সংখ্যাসূচক গণনা, চিত্র প্রক্রিয়াকরণ বা কম্প্রেশন
যখন সেরা পারফরম্যান্সের জন্য শেয়ার করা মেমরির প্রয়োজন হয়
একটি Node.js উদাহরণের মধ্যে সমান্তরাল জাভাস্ক্রিপ্ট কোড চালানোর সময়

কখন চাইল্ড প্রসেস ব্যবহার করবেন

বাহ্যিক প্রোগ্রাম বা কমান্ড চালানো
বিভিন্ন ভাষায় কাজ চলছে
যখন প্রধান প্রক্রিয়া এবং উদ্ভূত প্রক্রিয়াগুলির মধ্যে শক্তিশালী বিচ্ছিন্নতা প্রয়োজন

কখন ক্লাস্টার ব্যবহার করবেন

একাধিক কোর জুড়ে একটি HTTP সার্ভার স্কেলিং
লোড ব্যালেন্সিং ইনকামিং সংযোগ
অ্যাপ্লিকেশন নমনীয়তা এবং আপটাইম উন্নত করা

সর্বোত্তম অনুশীলন

থ্রেড অতিরিক্ত ব্যবহার করবেন না:শুধুমাত্র CPU-নিবিড় কাজের জন্য কর্মী থ্রেড ব্যবহার করুন যা মূল থ্রেড ব্লক করতে পারে
ওভারভিউ বিবেচনা করুন:থ্রেড তৈরি করার জন্য একটি ওভারভিউ আছে। খুব সংক্ষিপ্ত কাজের জন্য, এই ওভারভিউটি সুবিধাগুলিকে ছাড়িয়ে যেতে পারে
শ্রম পুল ব্যবহার করুন:প্রতিটি কাজের জন্য কর্মী তৈরি এবং ধ্বংস করার পরিবর্তে একাধিক কাজের জন্য কর্মীদের পুনরায় ব্যবহার করুন
ডেটা স্থানান্তর হ্রাস করুন:ArrayBuffer দিয়ে মালিকানা প্রতিস্থাপন করুন বা প্রচুর পরিমাণে ডেটা নিয়ে কাজ করার সময় SharedArrayBuffer ব্যবহার করুন
ত্রুটিগুলি সঠিকভাবে পরিচালনা করুন:সর্বদা কর্মীদের কাছ থেকে ত্রুটি ধরুন এবং কর্মীদের ব্যর্থতার জন্য একটি কৌশল রাখুন
শ্রম জীবনচক্র ট্র্যাক করুন:কর্মীদের স্বাস্থ্য নিরীক্ষণ করুন এবং তারা ক্র্যাশ হলে পুনরায় চালু করুন
উপযুক্ত সংযোগ ব্যবহার করুন:ভাগ করা মেমরি অ্যাক্সেস সমন্বয় করতে পরমাণু ব্যবহার করুন
আপনার সমাধান মানদণ্ড:থ্রেডগুলি সত্যিই সাহায্য করছে তা নিশ্চিত করতে সর্বদা কর্মক্ষমতা উন্নতি পরিমাপ করুন

⚠️সতর্কতা:

স্ক্রিপ্টিং আপনার কোডে জটিলতা যোগ করে। সমান্তরাল অপারেশনের জন্য প্রকৃত প্রয়োজন হলেই কর্মী থ্রেড ব্যবহার করুন। I/O-বাউন্ড ক্রিয়াকলাপগুলির জন্য, Node.js-এর অন্তর্নির্মিত অ্যাসিঙ্ক্রোনাস APIগুলি সাধারণত আরও দক্ষ।

সারাংশ

ওয়ার্কার থ্রেডস মডিউল Node.js-এ সত্যিকারের মাল্টি-থ্রেডিং ক্ষমতা প্রদান করে, যা মূল ইভেন্ট লুপকে ব্লক না করেই CPU- নিবিড় কাজগুলিকে সমান্তরালে চালানোর জন্য সক্ষম করে।

এই টিউটোরিয়ালে, আমরা কভার করেছি:

শ্রম টেক্সট কি এবং কখন ব্যবহার করা উচিত?
থ্রেডের মধ্যে বার্তা পাঠানো এবং গ্রহণ করা
CPU-নিবিড় কাজগুলির ব্যবহারিক উদাহরণ
ডেটা স্থানান্তর করা এবং শেয়ার করা মেমরি ব্যবহার করা
SharedArrayBuffer দিয়ে থ্রেডগুলির মধ্যে ডেটা ভাগ করা
অ্যাটমিক্সের সাথে থ্রেড অ্যাক্সেস সিঙ্ক্রোনাইজ করা হচ্ছে
দক্ষ টাস্ক ব্যবস্থাপনার জন্য একটি পুনর্ব্যবহারযোগ্য শ্রম পুল তৈরি করা
ব্যবহারিক অ্যাপ্লিকেশন যেমন সমান্তরাল চিত্র প্রক্রিয়াকরণ
অন্যান্য Node.js সমবর্তী মডেলের সাথে তুলনা
শ্রম পাঠ্যের কার্যকর ব্যবহারের জন্য সর্বোত্তম অনুশীলন

অনুশীলন করুন

সঠিক ক্লাসের নাম নির্বাচন করুন।

The ______ object is used to create a new worker thread in Node.js.

Thread
✗ ভুল! "থ্রেড" Node.js-এ একটি বৈধ শ্রেণী নয়
WorkerPool
✗ ভুল! "WorkerPool" একটি ব্যবহারকারী-সংজ্ঞায়িত শ্রেণী, Node.js-এ বিল্ট-ইন ক্লাস নয়
Worker
✓ ঠিক আছে! Node.js-এ একটি নতুন কর্মী থ্রেড তৈরি করতে "Worker" ক্লাস ব্যবহার করা হয়
ThreadPool
✗ ভুল! "ThreadPool" Node.js-এ একটি বৈধ শ্রেণী নয়